Add split-and-retry path to GpuProjectExec#14724
Add split-and-retry path to GpuProjectExec#14724thirtiseven wants to merge 5 commits intoNVIDIA:mainfrom
Conversation
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Greptile SummaryThis PR adds a split-and-retry path to Confidence Score: 4/5Safe to merge after verifying the double-close of pieces batches in runWithSplitRetry when Table.concatenate throws; all other changes are logically correct and well-tested. The incRefCount() guard at both call sites is correctly applied and the new split-retry logic is sound. The one issue is in runWithSplitRetry: closeOnExcept(pieces) still holds live references when buildNonEmptyBatchFromTypes is called; if the concat throws, that function's finally closes the batches and then closeOnExcept fires a second close on the same cuDF-native objects, driving ref-counts negative. sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala — specifically the closeOnExcept(pieces) scope in runWithSplitRetry. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[projectAndCloseWithRetrySingleBatch\nwithResource sb] --> B[projectWithRetrySingleBatch]
B --> C{splitRetry.enabled\n&& all deterministic?}
C -- yes --> D[sb.incRefCount\nrunWithSplitRetry]
D --> E[withRetry sb\nsplitSpillableInHalfByRows]
E --> F{OOM?}
F -- split --> G[run projection\non each half]
G --> H[collect pieces\nArrayBuffer]
H --> I[buildNonEmptyBatchFromTypes\nconcat & close pieces]
I --> J[return single ColumnarBatch]
F -- no OOM --> K[run projection\non full batch]
K --> J
C -- no --> L[withRetryNoSplit path\nexisting logic]
L --> J
Reviews (2): Last reviewed commit: "address coemments" | Re-trigger Greptile |
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
| val pieces = ArrayBuffer[ColumnarBatch]() | ||
| closeOnExcept(pieces) { _ => | ||
| while (resultIter.hasNext) { | ||
| pieces += resultIter.next() | ||
| } | ||
| val outputTypes = (0 until pieces.head.numCols()).map { i => | ||
| pieces.head.column(i).asInstanceOf[GpuColumnVector].dataType() | ||
| }.toArray | ||
| ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(pieces.toArray, outputTypes) | ||
| } | ||
| } |
There was a problem hiding this comment.
Double-close of
pieces when buildNonEmptyBatchFromTypes throws in the multi-batch path. ConcatAndConsumeAll.buildNonEmptyBatchFromTypes closes all input batches in its finally block (arrayOfBatches.foreach(_.close())). If it throws (e.g., OOM during Table.concatenate), both that finally and the closeOnExcept(pieces) handler call close on the same GpuColumnVector objects, driving the cuDF native ref-count negative. Narrow the closeOnExcept scope to cover only the collection loop and let buildNonEmptyBatchFromTypes take exclusive ownership for the concat step.
| val pieces = ArrayBuffer[ColumnarBatch]() | |
| closeOnExcept(pieces) { _ => | |
| while (resultIter.hasNext) { | |
| pieces += resultIter.next() | |
| } | |
| val outputTypes = (0 until pieces.head.numCols()).map { i => | |
| pieces.head.column(i).asInstanceOf[GpuColumnVector].dataType() | |
| }.toArray | |
| ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(pieces.toArray, outputTypes) | |
| } | |
| } | |
| val pieces = ArrayBuffer[ColumnarBatch]() | |
| closeOnExcept(pieces) { _ => | |
| while (resultIter.hasNext) { | |
| pieces += resultIter.next() | |
| } | |
| } | |
| val outputTypes = (0 until pieces.head.numCols()).map { i => | |
| pieces.head.column(i).asInstanceOf[GpuColumnVector].dataType() | |
| }.toArray | |
| // Transfer ownership: buildNonEmptyBatchFromTypes closes the batches itself | |
| // (in its finally block). Do not wrap in closeOnExcept here to avoid a | |
| // double-close if Table.concatenate throws. | |
| ConcatAndConsumeAll.buildNonEmptyBatchFromTypes(pieces.toArray, outputTypes) | |
| } |
Fixes #14191.
Description
GpuProjectExeccurrently retries OOM viawithRetryNoSplitonly — if a projection runs cuDF kernels with internal scratch allocations the pre-split estimator cannot see (regex, string-replace, etc.), an OOM during the projection cannot be recovered and fails the task.This PR adds a split-and-retry path: for purely deterministic projections,
GpuProjectExecnow drives the projection throughRmmRapidsRetryIterator.withRetry(splitSpillableInHalfByRows). On GPU OOM the input batch is halved by rows and the projection is re-run on each half; the resulting sub-batches are concatenated back viaConcatAndConsumeAll.buildNonEmptyBatchFromTypesto preserve the single-batch contract ofprojectAndCloseWithRetrySingleBatch.Mixed deterministic + non-deterministic projections fall through to the existing
withRetryNoSplitpath: the non-deterministic side is computed once on the full input batch and stitched row-by-row to the deterministic side, and row-splitting either side would break that alignment. BothGpuProjectExec.projectWithRetrySingleBatchandGpuTieredProject.projectWithRetrySingleBatchInternaldispatch on the same condition (forall(_.deterministic)/areAllDeterministic).A new internal config
spark.rapids.sql.projectExec.splitRetry.enabled(defaulttrue) gates the new path so it can be disabled to revert to the prior behavior if regressions surface.Checklists
Documentation
The new config is
internal()and the public behavior is unchanged in non-OOM cases.Testing
tests/src/test/scala/com/nvidia/spark/rapids/ProjectSplitRetrySuite.scalaadds five cases covering both dispatch sites:GpuProjectExec.projectAndCloseWithRetrySingleBatchwithforceSplitAndRetryOOMproduces output equal to a single-batch projectionGpuSplitAndRetryOOMthrough the legacy pathGpuTieredProject.projectAndCloseWithRetrySingleBatchGpuRandprojection routes through the legacy retry path (verified by comparing the rand column to a non-injected reference run)forceRetryOOMon the new path returns a single piece without splittingPerformance
TBD — perf testing to follow.